Added drop database option
authorJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 7 Jan 2019 15:19:25 +0000 (16:19 +0100)
committerJeroen van der Heijden <jeroen@transceptor.technology>
Mon, 7 Jan 2019 15:19:25 +0000 (16:19 +0100)
include/qpack/qpack.h
include/siri/db/db.h
include/siri/net/protocol.h
include/siri/service/request.h
include/xpath/xpath.h
src/siri/db/auth.c
src/siri/db/db.c
src/siri/net/bserver.c
src/siri/net/protocol.c
src/siri/service/request.c
src/xpath/xpath.c

index 2528a1beaf0bfd72c6c7f6e3255d44ca90915f5a..bb5168b682d1af1d3d62a1e03118ba6cf1fccf04 100644 (file)
@@ -160,6 +160,10 @@ static inline int qp_is_double(qp_types_t tp)
 {
     return tp == QP_DOUBLE;
 }
+static inline int qp_is_bool(qp_types_t tp)
+{
+    return tp == QP_TRUE || tp == QP_FALSE;
+}
 static inline int qp_is_raw_term(qp_obj_t * qp_obj)
 {
     return (qp_obj->tp == QP_RAW &&
index 952f96d89644ed9d380c7ceb3f428b4d266a81e2..d76c66ac1483dfccb4ea040c708575e818d034fb 100644 (file)
@@ -10,6 +10,7 @@ typedef struct siridb_s siridb_t;
 #define SIRIDB_MAX_DBNAME_LEN 256  /*    255 + NULL     */
 #define SIRIDB_SCHEMA 5
 #define SIRIDB_FLAG_REINDEXING 1
+#define SIRIDB_FLAG_DROPPED 2
 
 #define DEF_DROP_THRESHOLD 1.0              /* 100%         */
 #define DEF_SELECT_POINTS_LIMIT 1000000     /* one million  */
@@ -43,11 +44,13 @@ int8_t siridb_get_idle_percentage(siridb_t * siridb);
 int siridb_is_db_path(const char * dbpath);
 siridb_t * siridb_new(const char * dbpath, int lock_flags);
 siridb_t * siridb_get(llist_t * siridb_list, const char * dbname);
+siridb_t * siridb_get_by_qp(llist_t * siridb_list, qp_obj_t * qp_dbname);
 void siridb_decref_cb(siridb_t * siridb, void * args);
 ssize_t siridb_get_file(char ** buffer, siridb_t * siridb);
 int siridb_open_files(siridb_t * siridb);
 int siridb_save(siridb_t * siridb);
 void siridb__free(siridb_t * siridb);
+void siridb_drop(siridb_t * siridb);
 
 #define siridb_incref(siridb) siridb->ref++
 #define siridb_decref(_siridb) if (!--_siridb->ref) siridb__free(_siridb)
index 455f6c9885e88366c42263a4ed0ef32730f8adbf..f5a1f147b600d47a7d3091bfec42cc9595c4933c 100644 (file)
@@ -78,6 +78,7 @@ typedef enum
     BPROTO_ENABLE_BACKUP_MODE,          /* empty                            */
     BPROTO_DISABLE_BACKUP_MODE,         /* empty                            */
     BPROTO_TEE_PIPE_NAME_UPDATE,        /* tee pipe name                    */
+    BPROTO_DROP_DATABASE,               /* empty                            */
 } bproto_client_t;
 
 /*
@@ -125,7 +126,8 @@ typedef enum
     BPROTO_ACK_ENABLE_BACKUP_MODE,              /* empty                    */
     BPROTO_ACK_DISABLE_BACKUP_MODE,             /* empty                    */
     BPROTO_RES_GROUPS,                          /* [[name, series], ...]    */
-    BPROTO_ACK_TEE_PIPE_NAME                    /* empty                    */
+    BPROTO_ACK_TEE_PIPE_NAME,                   /* empty                    */
+    BPROTO_ACK_DROP_DATABASE,                   /* empty                    */
 
 } bproto_server_t;
 
index afececf22ec0eb4cda9f3e8db995ee9e8200110c..f991012ba50baa467a7388f5da9e1a7b997eae9e 100644 (file)
@@ -12,6 +12,7 @@ typedef enum
     SERVICE_NEW_DATABASE_,
     SERVICE_NEW_POOL,
     SERVICE_NEW_REPLICA,
+    SERVICE_DROP_DATABASE,
     SERVICE_GET_VERSION=64,
     SERVICE_GET_ACCOUNTS,
     SERVICE_GET_DATABASES
index 9524ef99606f420f8c484d7672c787a2495c9617..9c98df1d6d5ed72cf3c1bddf0b0e0d2e3b665a4b 100644 (file)
@@ -17,5 +17,6 @@ int xpath_file_exist(const char * fn);
 int xpath_is_dir(const char * path);
 ssize_t xpath_get_content(char ** buffer, const char * fn);
 int xpath_get_exec_path(char * path);
+int xpath_rmdir(const char * path);
 
 #endif  /* XPATH_H_ */
index 5a72c5f16397da5ced91876a820be0fa30d25cac..6d3ff9dcbac4a4ad2e4ddeb09debe91d79e6e634 100644 (file)
@@ -23,10 +23,6 @@ cproto_server_t siridb_auth_user_request(
     siridb_t * siridb;
     siridb_user_t * user;
 
-    char dbname[qp_dbname->len + 1];
-    memcpy(dbname, qp_dbname->via.raw, qp_dbname->len);
-    dbname[qp_dbname->len] = 0;
-
     char username[qp_username->len + 1];
     memcpy(username, qp_username->via.raw, qp_username->len);
     username[qp_username->len] = 0;
@@ -35,7 +31,7 @@ cproto_server_t siridb_auth_user_request(
     memcpy(password, qp_password->via.raw, qp_password->len);
     password[qp_password->len] = 0;
 
-    if ((siridb = siridb_get(siri.siridb_list, dbname)) == NULL)
+    if ((siridb = siridb_get_by_qp(siri.siridb_list, qp_dbname)) == NULL)
     {
         log_warning("User authentication request failed: unknown database");
         return CPROTO_ERR_AUTH_UNKNOWN_DB;
index bc1052176038a4aa2a4479ed0bc37c98deaa8d35..59da730b95f5048eac319301e8c69827dd17d0bb 100644 (file)
@@ -48,6 +48,7 @@ static int siridb__from_unpacker(
 static siridb_t * siridb__from_dat(const char * dbpath);
 static int siridb__read_conf(siridb_t * siridb);
 static int siridb__lock(const char * dbpath, int lock_flags);
+static inline int siridb__cmp_db(siridb_t * siridb, qp_obj_t * dbname);
 
 #define READ_DB_EXIT_WITH_ERROR(ERROR_MSG)  \
     strcpy(err_msg, ERROR_MSG);             \
@@ -529,6 +530,34 @@ siridb_t * siridb_get(llist_t * siridb_list, const char * dbname)
     return NULL;
 }
 
+/*
+ * Get a siridb object by qpack name.
+ */
+siridb_t * siridb_get_by_qp(llist_t * siridb_list, qp_obj_t * qp_dbname)
+{
+    assert (qp_dbname->tp == QP_RAW);
+
+    llist_node_t * node = siridb_list->first;
+    siridb_t * siridb;
+
+    while (node != NULL)
+    {
+        siridb = (siridb_t *) node->data;
+        if (qp_dbname->len == strlen(siridb->dbname) &&
+            strncmp(
+                siridb->dbname,
+                (const char *) qp_dbname->via.raw,
+                qp_dbname->len) == 0)
+        {
+            return siridb;
+        }
+        node = node->next;
+    }
+
+    return NULL;
+}
+
+
 /*
  * Sometimes we need a callback function and cannot use a macro expansion.
  */
@@ -706,14 +735,53 @@ void siridb__free(siridb_t * siridb)
         }
     }
 
+    uv_mutex_destroy(&siridb->series_mutex);
+    uv_mutex_destroy(&siridb->shards_mutex);
+
+    if (siridb->flags & SIRIDB_FLAG_DROPPED)
+    {
+        xpath_rmdir(siridb->dbpath);
+    }
+
     free(siridb->dbpath);
     free(siridb->dbname);
     free(siridb->time);
+    free(siridb);
+}
 
-    uv_mutex_destroy(&siridb->series_mutex);
-    uv_mutex_destroy(&siridb->shards_mutex);
+void siridb_drop(siridb_t * siridb)
+{
+    if (siridb->flags & SIRIDB_FLAG_DROPPED)
+    {
+        return;
+    }
 
-    free(siridb);
+    log_warning("dropping database '%s'", siridb->dbname);
+
+    siridb->flags |= SIRIDB_FLAG_DROPPED;
+
+    uv_mutex_lock(&siri.siridb_mutex);
+
+    (void *) llist_remove(siri.siridb_list, NULL, siridb);
+
+    uv_mutex_unlock(&siri.siridb_mutex);
+
+    if (siridb->replicate != NULL)
+    {
+        siridb_replicate_close(siridb->replicate);
+    }
+
+    if (siridb->reindex != NULL && siridb->reindex->timer != NULL)
+    {
+        siridb_reindex_close(siridb->reindex);
+    }
+
+    if (siridb->groups != NULL)
+    {
+        siridb_groups_destroy(siridb->groups);
+    }
+
+    siridb_decref(siridb);
 }
 
 /*
@@ -944,5 +1012,14 @@ static int siridb__lock(const char * dbpath, int lock_flags)
     return 0;
 }
 
+static inline int siridb__cmp_db(siridb_t * siridb, qp_obj_t * dbname)
+{
+    size_t len = strlen(siridb->dbname);
+    return (
+            dbname->len == len &&
+            strncmp(siridb->dbname, (const char *) dbname->via.raw, len) == 0
+    );
+}
+
 
 
index 3854db7e630c41051210e957dc079a48876d99d0..b4792d446c61c32b10c082a3f34fe8c71ded24e3 100644 (file)
@@ -46,6 +46,7 @@ static void on_log_level_update(
 static void on_tee_pipe_name_update(
         sirinet_stream_t * client,
         sirinet_pkg_t * pkg);
+static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg);
 static void on_query(
         sirinet_stream_t * client,
@@ -279,6 +280,9 @@ static void on_data(sirinet_stream_t * client, sirinet_pkg_t * pkg)
     case BPROTO_TEE_PIPE_NAME_UPDATE:
         on_tee_pipe_name_update(client, pkg);
         break;
+    case BPROTO_DROP_DATABASE:
+        on_drop_database(client, pkg);
+        break;
     }
 
 }
@@ -458,6 +462,22 @@ static void on_tee_pipe_name_update(
     }
 }
 
+static void on_drop_database(sirinet_stream_t * client, sirinet_pkg_t * pkg)
+{
+    SERVER_CHECK_AUTHENTICATED(client, server)
+
+    siridb_t * siridb = client->siridb;
+    sirinet_pkg_t * package = NULL;
+
+    siridb_drop(siridb);
+
+    package = sirinet_pkg_new(pkg->pid, 0, BPROTO_ACK_DROP_DATABASE, NULL);
+    if (package != NULL)
+    {
+        sirinet_pkg_send(client, package);
+    }
+}
+
 static void on_repl_finished(sirinet_stream_t * client, sirinet_pkg_t * pkg)
 {
     SERVER_CHECK_AUTHENTICATED(client, server)
index b9b6122bf6b8da26cd10eaa7ff9c2a1761a70173..e0572c0d177d7f15c2a522f3180f1f3ba1c4723c 100644 (file)
@@ -85,6 +85,7 @@ const char * sirinet_bproto_client_str(bproto_client_t n)
     case BPROTO_ENABLE_BACKUP_MODE: return "BPROTO_ENABLE_BACKUP_MODE";
     case BPROTO_DISABLE_BACKUP_MODE: return "BPROTO_DISABLE_BACKUP_MODE";
     case BPROTO_TEE_PIPE_NAME_UPDATE: return "BPROTO_TEE_PIPE_NAME_UPDATE";
+    case BPROTO_DROP_DATABASE: return "BPROTO_DROP_DATABASE";
     default:
         sprintf(protocol_str, "BPROTO_CLIENT_TYPE_UNKNOWN (%d)", n);
         return protocol_str;
@@ -121,6 +122,7 @@ const char * sirinet_bproto_server_str(bproto_server_t n)
     case BPROTO_ACK_DISABLE_BACKUP_MODE: return "BPROTO_ACK_DISABLE_BACKUP_MODE";
     case BPROTO_RES_GROUPS: return "BPROTO_RES_GROUPS";
     case BPROTO_ACK_TEE_PIPE_NAME: return "BPROTO_ACK_TEE_PIPE_NAME";
+    case BPROTO_ACK_DROP_DATABASE: return "BPROTO_ACK_DROP_DATABASE";
     default:
         sprintf(protocol_str, "BPROTO_SERVER_TYPE_UNKNOWN (%d)", n);
         return protocol_str;
index c60ab7a6eb18f8e2908cf59d5ffd60a8c57b81b1..1ca029d984f0125b45183c8537e0c2e5325a5ef0 100644 (file)
@@ -3,21 +3,22 @@
  */
 #define PCRE2_CODE_UNIT_WIDTH 8
 
+#include <lock/lock.h>
+#include <logger/logger.h>
+#include <pcre2.h>
+#include <siri/db/buffer.h>
+#include <siri/db/reindex.h>
+#include <siri/db/server.h>
+#include <siri/db/servers.h>
 #include <siri/service/account.h>
 #include <siri/service/client.h>
-#include <stddef.h>
 #include <siri/service/request.h>
 #include <siri/siri.h>
-#include <logger/logger.h>
-#include <pcre2.h>
-#include <lock/lock.h>
-#include <xmath/xmath.h>
+#include <siri/version.h>
+#include <stddef.h>
 #include <unistd.h>
 #include <uuid/uuid.h>
-#include <siri/db/server.h>
-#include <siri/db/buffer.h>
-#include <siri/version.h>
-#include <siri/db/reindex.h>
+#include <xmath/xmath.h>
 
 #define DEFAULT_TIME_PRECISION 1
 #define DEFAULT_BUFFER_SIZE 1024
@@ -142,6 +143,9 @@ static cproto_server_t SERVICE_on_drop_account(
         qp_unpacker_t * qp_unpacker,
         qp_obj_t * qp_account,
         char * err_msg);
+static cproto_server_t SERVICE_on_drop_database(
+        qp_unpacker_t * qp_unpacker,
+        char * err_msg);
 static cproto_server_t SERVICE_on_new_database(
         qp_unpacker_t * qp_unpacker,
         char * err_msg);
@@ -170,6 +174,7 @@ static int SERVICE_find_database(siridb_t * siridb, qp_obj_t * dbname);
 static int SERVICE_list_accounts(
         siri_service_account_t * account,
         qp_packer_t * packer);
+static void SERVICE_on_drop_database_cb(vec_t *, void *);
 
 static size_t max_filename_sz;
 
@@ -263,6 +268,8 @@ cproto_server_t siri_service_request(
                 client,
                 SERVICE_NEW_REPLICA,
                 err_msg);
+    case SERVICE_DROP_DATABASE:
+        return SERVICE_on_drop_database(qp_unpacker, err_msg);
     case SERVICE_GET_VERSION:
         return SERVICE_on_get_version(qp_unpacker, packaddr, err_msg);
     case SERVICE_GET_ACCOUNTS:
@@ -438,6 +445,101 @@ static cproto_server_t SERVICE_on_drop_account(
                     CPROTO_ERR_SERVICE : CPROTO_ACK_SERVICE;
 }
 
+/*
+ * Returns CPROTO_ACK_SERVICE when successful.
+ * In case of an error CPROTO_ERR_SERVICE can be returned in which case err_msg
+ * is set, or CPROTO_ERR_SERVICE_INVALID_REQUEST is returned.
+ */
+static cproto_server_t SERVICE_on_drop_database(
+        qp_unpacker_t * qp_unpacker,
+        char * err_msg)
+{
+    _Bool ignore_offline;
+    siridb_t * siridb;
+    qp_obj_t qp_key, qp_target, qp_ignore_offline;
+    sirinet_pkg_t * pkg;
+    vec_t * servers;
+
+    qp_target.tp = QP_HOOK;
+    qp_ignore_offline.tp = QP_HOOK;
+
+    if (!qp_is_map(qp_next(qp_unpacker, NULL)))
+    {
+        return CPROTO_ERR_SERVICE_INVALID_REQUEST;
+    }
+
+    while (qp_next(qp_unpacker, &qp_key) == QP_RAW)
+    {
+        if (    strncmp(
+                    (const char *) qp_key.via.raw,
+                    "database",
+                    qp_key.len) == 0 &&
+                qp_next(qp_unpacker, &qp_target) == QP_RAW)
+        {
+            continue;
+        }
+
+        if (    strncmp(
+                    (const char *) qp_key.via.raw,
+                    "ignore_offline",
+                    qp_key.len) == 0 &&
+                qp_is_bool(qp_next(qp_unpacker, &qp_ignore_offline)))
+        {
+            continue;
+        }
+        return CPROTO_ERR_SERVICE_INVALID_REQUEST;
+    }
+
+    if (qp_target.tp == QP_HOOK)
+    {
+        return CPROTO_ERR_SERVICE_INVALID_REQUEST;
+    }
+
+    ignore_offline = (
+            qp_ignore_offline.tp != QP_HOOK &&
+            qp_ignore_offline.tp == QP_TRUE
+    );
+
+    siridb = siridb_get_by_qp(siri.siridb_list, &qp_target);
+    if (siridb == NULL)
+    {
+        sprintf(err_msg, "cannot find database '%.*s'",
+                (int) qp_target.len, (char *) qp_target.via.raw);
+        return CPROTO_ERR_SERVICE;
+    }
+
+    if (!ignore_offline && !siridb_servers_online(siridb))
+    {
+        sprintf(err_msg,
+                "at least one server is offline, "
+                "set `ignore_offline` to true if you want to "
+                "ignore offline servers");
+        return CPROTO_ERR_SERVICE;
+    }
+
+    pkg = sirinet_pkg_new(0, 0, BPROTO_DROP_DATABASE, NULL);
+    servers = siridb_servers_other2vec(siridb);
+    if (pkg == NULL || servers == NULL)
+    {
+        free(pkg);
+        vec_free(servers);
+        sprintf(err_msg, "memory allocation error");
+        return CPROTO_ERR_SERVICE;
+    }
+
+    siridb_servers_send_pkg(
+            servers,
+            pkg,
+            0,
+            SERVICE_on_drop_database_cb,
+            NULL);
+
+    siridb_drop(siridb);
+    vec_free(servers);
+
+    return CPROTO_ACK_SERVICE;
+}
+
 /*
  * Returns CPROTO_ACK_SERVICE when successful.
  * In case of an error CPROTO_ERR_SERVICE can be returned in which case err_msg
@@ -988,3 +1090,25 @@ static int SERVICE_list_accounts(
 {
     return qp_add_string(packer, account->account);
 }
+
+static void SERVICE_on_drop_database_cb(
+        vec_t * promises,
+        void * data __attribute__((unused)))
+{
+    log_debug("drop database has been send to all online servers");
+
+    if (promises != NULL)
+    {
+        size_t i;
+        sirinet_promise_t * promise;
+        for (i = 0; i < promises->len; i++)
+        {
+            promise = promises->data[i];
+            if (promise != NULL)
+            {
+                free(promise->data);
+                sirinet_promise_decref(promise);
+            }
+        }
+    }
+}
index 20d26edb790bb9d09d816db57bdcfa76de57e7af..8e1eb23eb8d665287ad1a23649bf4193f5e14384 100644 (file)
@@ -8,6 +8,7 @@
 #include <string.h>
 #include <sys/stat.h>
 #include <unistd.h>
+#include <dirent.h>
 #include <xpath/xpath.h>
 
 /*
@@ -104,3 +105,44 @@ int xpath_get_exec_path(char * path)
 
     return 0;
 }
+
+int xpath_rmdir(const char * path)
+{
+    DIR * d = opendir(path);
+    if (!d)
+        return -1;
+
+    size_t bufsz = 0, path_len = strlen(path);
+    const char * slash = (path[path_len - 1] == '/') ? "" : "/";
+    struct dirent * p;
+    char * buf = NULL;
+
+    while ((p = readdir(d)))
+    {
+        size_t len;
+
+        /* Skip the names "." and ".." as we don't want to recurse on them. */
+        if (!strcmp(p->d_name, ".") || !strcmp(p->d_name, ".."))
+            continue;
+
+        len = path_len + strlen(p->d_name) + 2;
+        if (len > bufsz)
+        {
+            bufsz = len;
+            char * tmp = realloc(buf, bufsz);
+            if (!tmp) goto stop;
+            buf = tmp;
+        }
+
+        snprintf(buf, len, "%s%s%s", path, slash, p->d_name);
+
+        if (xpath_is_dir(buf) ? xpath_rmdir(buf) : unlink(buf))
+            goto stop;
+    }
+
+stop:
+    free(buf);
+    closedir(d);
+
+    return rmdir(path);
+}